Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] sync task serve and cleanup #14863

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Conversation

zzstoatzz
Copy link
Collaborator

@zzstoatzz zzstoatzz commented Aug 6, 2024

this PR accomplishes for Task.serve what these PRs:

did for Flow.serve

that is:

  • make serve sync (its always blocking anyways)
  • update resulting error when we Ctrl-C Task.serve to be consistent with Flow.serve
Before
» python flows/repros/serve_task.py
14:20:32.048 | INFO    | prefect.task_worker - Starting task worker...
14:20:32.049 | INFO    | prefect.task_worker - Subscribing to runs of task(s): foo
^CTraceback (most recent call last):
  File "/Users/nate/github.com/prefecthq/prefect/flows/repros/serve_task.py", line 10, in <module>
    foo.serve()
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/utilities/asyncutils.py", line 392, in coroutine_wrapper
14:20:32.709 | INFO    | prefect.task_worker - Task worker interrupted, stopping...
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/utilities/asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/_internal/concurrency/calls.py", line 175, in result
    self._condition.wait(timeout)
  File "/opt/homebrew/Cellar/[email protected]/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 355, in wait
    waiter.acquire()
KeyboardInterrupt
After
» python flows/repros/serve_task.py
14:59:19.585 | INFO    | prefect.task_worker - Starting task worker...
14:59:19.586 | INFO    | prefect.task_worker - Subscribing to runs of task(s): f
^C14:59:20.559 | INFO    | prefect.task_worker - Received KeyboardInterrupt, shutting down...

this PR also moves the creation of the anyio task group and capacity limiter from TaskWorker.__init__ to TaskWorker.__aenter__ so that the task serve utility can be sync like the flow utility.

Copy link

codspeed-hq bot commented Aug 6, 2024

CodSpeed Performance Report

Merging #14863 will not alter performance

Comparing sync-task-serve-incremental (7118b34) with main (fa0df1e)

Summary

✅ 3 untouched benchmarks

@zzstoatzz zzstoatzz changed the base branch from main to sync-task-serve August 6, 2024 20:41
@zzstoatzz zzstoatzz changed the base branch from sync-task-serve to main August 6, 2024 20:41
Copy link
Contributor

This pull request is stale because it has been open 14 days with no activity. To keep this pull request open remove stale label or comment.

Copy link
Contributor

This pull request is stale because it has been open 14 days with no activity. To keep this pull request open remove stale label or comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants